大部分介紹Reactive Programming都一定會提到Backpressure
,可能放在第九天有點稍晚,但我覺得有基本的Reactor
觀念後再來看也不遲。
Reactive Programming 常常提到的backpressure
,這在現實裡的管道中也會有,在軟體提到的背壓(backpressure
)是指當生產者(producer
)生產超過消費者(consumer
),為了避免消費者被過多的資料所淹沒的一個保護機制。例如拿寶特瓶來喝水,背壓就是正常的喝水,而沒有了這個機制,就像是喝到一半有人用力捏寶特瓶,水就會來不及喝下去噴出來。
用程式碼來看介紹,interval
會隨著時間不斷的emits
data,delay會延遲Subscriber去接受,導致漸漸的DATA溢出報錯。
Flux.interval(Duration.ofMillis(1))
.log()
.delayElements(Duration.ofMillis(100))
.blockLast();
reactor.core.Exceptions$OverflowException: Could not emit tick 32 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:233)
at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:132)
at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.![https://ithelp.ithome.com.tw/upload/images/20210923/201414186VL7Y0MpU6.png](https://ithelp.ithome.com.tw/upload/images/20210923/201414186VL7Y0MpU6.png)til.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
稍微調整一下,加上一個buffer就能避免太快速的報錯,但若是Buffer被塞滿了仍然是會出錯,因為interval
是根據時間不斷推送資料,較難去維持backpressure
的機制,這邊只是為了demo而使用。
Flux.interval(Duration.ofMillis(1))
.log()
.onBackpressureBuffer()
.concatMap(x -> Mono.delay(Duration.ofMillis(100)))
.blockLast();
range
則會定義出總共emit
的量,這時候透過limitRate(3)
是subscribers 只會每次要求3個而不會因為太多的資料量而導致錯誤,也就是如果publisher
可以根據下游(downstream)的要求來決定產生資料的速度,就算是有backpressure
的機制
Flux.range(1, 20)
.log()
.limitRate(3)
.subscribe();
Backpressure在Reactive Programming是蠻重要的觀念,後面的文章也會陸續提到Reactor的Api對應Backpressure會有一些處理機制。